Big Data and Analytics DataFrame তৈরি এবং Data Query করা গাইড ও নোট

405

অ্যাপাচি স্পার্ক (Apache Spark) একটি ডিস্ট্রিবিউটেড ডেটা প্রসেসিং ফ্রেমওয়ার্ক যা বড় ডেটাসেটের সাথে কার্যকরীভাবে কাজ করতে সক্ষম। DataFrame হল স্পার্কের একটি গুরুত্বপূর্ণ ডেটা স্ট্রাকচার, যা rows এবং columns এর মাধ্যমে ডেটা সঞ্চালন ও বিশ্লেষণ করতে সাহায্য করে। এটি SQL কুয়েরি, ফাংশনাল ট্রান্সফরমেশন, এবং অপটিমাইজড ডেটা প্রসেসিংয়ের জন্য ব্যবহার করা হয়।

এই টিউটোরিয়ালে, আমরা Apache Spark DataFrame তৈরি এবং Data Query করা এর উপর ফোকাস করব, এবং দেখব কিভাবে স্পার্কে ডেটা প্রসেসিং এবং কুয়েরি করা যায়।


1. DataFrame তৈরি করা

স্পার্কে DataFrame তৈরি করতে, প্রথমে SparkSession তৈরি করতে হবে। SparkSession হলো স্পার্কের এন্টারির এন্ট্রি পয়েন্ট, যেটি আপনাকে স্পার্ক অ্যাপ্লিকেশন পরিচালনা করতে সাহায্য করে। এরপর আপনি বিভিন্ন ডেটা সোর্স (যেমন CSV, JSON, Parquet, JDBC) থেকে ডেটা লোড করে DataFrame তৈরি করতে পারেন।

SparkSession তৈরি করা:

import org.apache.spark.sql.SparkSession

// Create a SparkSession
val spark = SparkSession.builder
  .appName("Spark DataFrame Example")
  .getOrCreate()

এখানে, SparkSession.builder স্পার্ক অ্যাপ্লিকেশন তৈরি করার জন্য ব্যবহৃত হয় এবং appName নির্ধারণ করে অ্যাপ্লিকেশনের নাম।

CSV ফাইল থেকে DataFrame তৈরি করা:

// Read CSV file into DataFrame
val df = spark.read.option("header", "true").csv("path_to_file.csv")

// Show the DataFrame
df.show()

এখানে:

  • option("header", "true"): CSV ফাইলের প্রথম লাইনে হেডার (column names) থাকবে।
  • df.show(): DataFrame এর প্রথম কিছু রেকর্ড প্রদর্শন করে।

JSON ফাইল থেকে DataFrame তৈরি করা:

val df_json = spark.read.json("path_to_file.json")
df_json.show()

Parquet ফাইল থেকে DataFrame তৈরি করা:

val df_parquet = spark.read.parquet("path_to_file.parquet")
df_parquet.show()

DataFrame Schema দেখতে:

df.printSchema()

এটি DataFrame এর স্কিমা (ফিল্ড নাম এবং টাইপ) প্রদর্শন করবে।


2. DataFrame এর উপর Data Query করা

স্পার্কে DataFrame এর মাধ্যমে ডেটা কুয়েরি করার জন্য Spark SQL ব্যবহার করা যায়। স্পার্ক SQL আপনাকে SQL এর মতো কুয়েরি লিখতে সক্ষম করে, যা ডেটা ট্রান্সফরমেশন এবং বিশ্লেষণে সহায়ক।

Select Columns:

// Select specific columns
val selectedColumns = df.select("name", "age")
selectedColumns.show()

এখানে, select() মেথডটি নির্দিষ্ট কলাম নির্বাচন করে।

Filter Rows:

// Filter rows based on condition
val filteredData = df.filter(df("age") > 30)
filteredData.show()

এখানে, filter() মেথডটি নির্দিষ্ট শর্তের ভিত্তিতে রেকর্ড ফিল্টার করে।

Group By:

// Group by a column and aggregate
val groupedData = df.groupBy("category").count()
groupedData.show()

এখানে, groupBy() মেথডটি গ্রুপিং অপারেশন পরিচালনা করে এবং count() মেথডটি প্রতিটি গ্রুপের জন্য ডেটা গণনা করে।

Order By:

// Sort the DataFrame based on a column
val sortedData = df.orderBy("age")
sortedData.show()

এখানে, orderBy() মেথডটি DataFrame কে একটি নির্দিষ্ট কলামের ভিত্তিতে সাজায়।

SQL Query ব্যবহার করে DataFrame Query করা:

স্পার্ক DataFrame থেকে SQL কুয়েরি ব্যবহার করতে, প্রথমে createOrReplaceTempView() মেথডটি ব্যবহার করে DataFrame কে একটি টেম্পোরারি ভিউ হিসেবে নিবন্ধন করতে হবে।

// Register DataFrame as a temporary view
df.createOrReplaceTempView("people")

// Query the DataFrame using SQL
val sqlResult = spark.sql("SELECT name, age FROM people WHERE age > 30")
sqlResult.show()

এখানে:

  • createOrReplaceTempView() DataFrame কে একটি SQL টেম্পোরারি ভিউ হিসেবে নিবন্ধন করে।
  • spark.sql() SQL কুয়েরি ব্যবহার করে DataFrame থেকে ডেটা নির্বাচন করা হয়।

3. DataFrame Transformation Examples

স্পার্কে DataFrame-এর ওপর বিভিন্ন ধরনের ট্রান্সফরমেশন অপারেশন করা যায়, যেমন map, flatMap, join, ইত্যাদি।

Map Transformation:

// Map transformation example
val mappedData = df.map(row => (row.getAs[String]("name"), row.getAs[Int]("age")))
mappedData.show()

এখানে, map() একটি রো থেকে নতুন একটি মান তৈরি করে এবং নতুন DataFrame তৈরি করে।

Join Operation:

// Join two DataFrames
val df1 = spark.read.json("path_to_file1.json")
val df2 = spark.read.json("path_to_file2.json")

val joinedData = df1.join(df2, df1("id") === df2("id"))
joinedData.show()

এখানে, join() মেথডটি দুটি DataFrame এর উপর join অপারেশন চালায়।


4. DataFrame Aggregation Functions

স্পার্কের DataFrame API এর মাধ্যমে বিভিন্ন aggregation functions ব্যবহার করে ডেটার উপর গাণিতিক অপারেশন করা যায়।

Count:

// Count the number of records
val count = df.count()
println(s"Record count: $count")

Sum:

// Calculate the sum of a column
val sum = df.agg(sum("age")).show()

Average:

// Calculate the average of a column
val avg = df.agg(avg("age")).show()

Min and Max:

// Calculate the min and max of a column
val minMax = df.agg(min("age"), max("age")).show()

5. Writing DataFrame to Storage

স্পার্কে DataFrame কে বিভিন্ন স্টোরেজ ফরম্যাটে যেমন CSV, Parquet, JSON ইত্যাদি ফরম্যাটে লেখা যায়।

Write DataFrame to CSV:

df.write.option("header", "true").csv("path_to_output.csv")

Write DataFrame to Parquet:

df.write.parquet("path_to_output.parquet")

Write DataFrame to JSON:

df.write.json("path_to_output.json")

Conclusion

DataFrame স্পার্কের একটি শক্তিশালী ডেটা স্ট্রাকচার যা SQL কুয়েরি, ডেটা ট্রান্সফরমেশন এবং অপটিমাইজড ডেটা প্রসেসিংয়ের জন্য ব্যবহৃত হয়। DataFrame তৈরি এবং Data Query করার মাধ্যমে আপনি বিভিন্ন ধরনের ডেটা প্রসেসিং ও বিশ্লেষণ করতে পারেন। স্পার্ক SQL এবং DataFrame API ব্যবহার করে আপনি সহজে ডেটা কুয়েরি করতে পারবেন এবং ডেটা ফিল্টার, গ্রুপ, অর্ডার, এবং আগ্রিগেট করতে পারবেন। DataFrame একটি উন্নত পারফরম্যান্স এবং কার্যকারিতা প্রদান করে, যা বড় ডেটাসেটের সাথে কাজ করার জন্য অত্যন্ত কার্যকরী।

Content added By
Promotion

Are you sure to start over?

Loading...